package com.shop.zzh.config;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.core.env.Environment;


/**
 * @program: shop
 * @description: 消息队列配置类
 * @author: zhuzh
 * @create: 2019-11-21 17:59
 **/
@Configuration
public class RabbitConfig {
	private final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

	@Autowired
	private Environment environment;
	@Autowired
	private CachingConnectionFactory connectionFactory;
	@Autowired
	private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;


	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}

	/**
	 * 单一消费者
	 * @return
	 */
	@Bean(name = "singleListenerContainer")
	public SimpleRabbitListenerContainerFactory listenerContainer(){
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setMessageConverter(new Jackson2JsonMessageConverter());
		factory.setConcurrentConsumers(1);
		factory.setMaxConcurrentConsumers(1);
		factory.setPrefetchCount(1);
		factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
		return factory;
	}

	/**
	 * 多个消费者
	 * @return
	 */
	@Bean(name = "multiListenerContainer")
	public SimpleRabbitListenerContainerFactory multiListenerContainer(){
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factoryConfigurer.configure(factory,connectionFactory);
		factory.setMessageConverter(new Jackson2JsonMessageConverter());
		factory.setAcknowledgeMode(AcknowledgeMode.NONE);
		factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.concurrency",int.class));
		factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
		factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.prefetch",int.class));
		return factory;
	}

	@Bean
	public RabbitTemplate rabbitTemplate(){
		connectionFactory.setPublisherConfirms(true);
		connectionFactory.setPublisherReturns(true);
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		rabbitTemplate.setMandatory(true);
		rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
			}
		});
		rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
			@Override
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
				logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
			}
		});
		return rabbitTemplate;
	}


	/**
	 * 针对消费者配置
	 * 1. 设置交换机类型
	 * 2. 将队列绑定到交换机
	 FanoutExchange: 将消息分发到所有的绑定队列，无routingkey的概念
	 HeadersExchange ：通过添加属性key-value匹配
	 DirectExchange:按照routingkey分发到指定队列
	 TopicExchange:多关键字匹配
	 */
	@Bean
	public DirectExchange defaultExchange() {
		return new DirectExchange(environment.getProperty("log.user.exchange.name"),true,false);
	}
	/**
	 * 获取队列A
	 * @return
	 */
	@Bean
	public Queue queueA() {
		logger.debug(environment.getProperty("log.user.queue.name"));
		return new Queue(environment.getProperty("log.user.queue.name"), true); //队列持久
	}
	@Bean
	public Binding binding() {
		return BindingBuilder.bind(queueA()).to(defaultExchange()).with(environment.getProperty("log.user.routing.key.name"));
	}

}
